// Copyright 2014 Google Inc.  All Rights Reserved
// Author: Wojtek Żółtak (wojciech.zoltak@gmail.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Provides utility class which classes for generation and validation of
// data in Views.

// TODO(wzoltak): Move to `utils`?
#ifndef SUPERSONIC_CONTRIB_STORAGE_CORE_TEST_DATA_H_
#define SUPERSONIC_CONTRIB_STORAGE_CORE_TEST_DATA_H_

#include <glog/logging.h>
#include <algorithm>
#include <memory>
#include <random>
#include <vector>

#include "gtest/gtest.h"


namespace supersonic {

class DataValidator;

// Interface for data generation. Data generation should be deterministic,
// i.e. two generators created with same arguments should generate exactly the
// same data.
class DataGenerator {
 public:
  DataGenerator() {}
  virtual ~DataGenerator() {}

  // Creates related validator. Validator expects data produced by the generator
  // from the beginning of its life.
  virtual std::unique_ptr<DataValidator> CreateValidator() = 0;

  // Generates a `row_count` rows of data.
  virtual VariantConstPointer Generate(rowcount_t row_count) = 0;

 private:
  DISALLOW_COPY_AND_ASSIGN(DataGenerator);
};


// Interface for validation data generated by DataGenerator.
class DataValidator {
 public:
  explicit DataValidator(std::unique_ptr<DataGenerator> generator)
      : generator_(std::move(generator)) {}
  virtual ~DataValidator() {}

  // Validates given data by comparing it with output from underlying
  // generator, ignoring null values.
  virtual void Validate(rowcount_t row_count,
                        VariantConstPointer ptr,
                        bool_const_ptr is_null) = 0;

  // Skips `row_count` from validation. Allows validation starting from
  // particular row.
  void Skip(rowcount_t row_count) {
    const rowcount_t max_step = 1000;
    while (row_count > 0) {
      rowcount_t step = std::min(row_count, max_step);
      generator_->Generate(step);
      row_count -= step;
    }
  }

 protected:
  std::unique_ptr<DataGenerator> generator_;

 private:
  DISALLOW_COPY_AND_ASSIGN(DataValidator);
};


// Validator for basic C++ types. Simply compares values.
template <DataType T>
class BasicTypeValidator : public DataValidator {
 public:
  typedef typename TypeTraits<T>::cpp_type CppType;
  // G++ 4.6 (default in Ubuntu 12.04) does not support base constructors :(.
  explicit BasicTypeValidator(std::unique_ptr<DataGenerator> generator)
      : DataValidator(std::move(generator)) {}
  ~BasicTypeValidator() {}

  void Validate(rowcount_t row_count,
                VariantConstPointer ptr,
                bool_const_ptr is_null) {
    const CppType* data = ptr.as<T>();
    const CppType* expected = generator_->Generate(row_count).as<T>();
    for (int i = 0; i < row_count; i++) {
      if (is_null[i]) {
        continue;
      }
      ASSERT_EQ(expected[i], data[i]);
    }
  }

 private:
  DISALLOW_COPY_AND_ASSIGN(BasicTypeValidator);
};

// Validator for types represented as StringPieces. Compares length and pieces
// contents.
class VariantTypeValidator : public DataValidator {
 public:
  explicit VariantTypeValidator(std::unique_ptr<DataGenerator> generator)
      : DataValidator(std::move(generator)) {}
  ~VariantTypeValidator() {}

  void Validate(rowcount_t row_count,
                VariantConstPointer ptr,
                bool_const_ptr is_null) {
    const StringPiece* data = ptr.as_variable_length();
    const StringPiece* expected =
        generator_->Generate(row_count).as_variable_length();
    for (int i = 0; i < row_count; i++) {
      if (is_null[i]) {
        continue;
      }
      ASSERT_EQ(data[i].length(), expected[i].length());
      ASSERT_EQ(0, data[i].compare(expected[i]));
    }
  }

 private:
  DISALLOW_COPY_AND_ASSIGN(VariantTypeValidator);
};

// Generator for basic C++ types.
template <DataType T>
class BasicTypeGenerator : public DataGenerator {
 public:
  typedef typename TypeTraits<T>::cpp_type CppType;

  explicit BasicTypeGenerator(int seed) : seed_(seed), generator_(seed) {}
  ~BasicTypeGenerator() {}

  std::unique_ptr<DataValidator> CreateValidator() {
    std::unique_ptr<BasicTypeGenerator>
        generator(new BasicTypeGenerator<T>(seed_));
    return std::unique_ptr<DataValidator>(
        new BasicTypeValidator<T>(std::move(generator)));
  }

  VariantConstPointer Generate(rowcount_t row_count) {
    data_.resize(row_count);
    // TODO(wzoltak): Use buffer reallocation instead of dirty hack.
    CppType* typed_data = reinterpret_cast<CppType*>(&data_[0]);
    for (rowcount_t i = 0; i < row_count; i++) {
      typed_data[i] = generator_();
    }
    return VariantConstPointer(typed_data);
  }

 private:
  int seed_;
  std::default_random_engine generator_;
  std::vector<uint64_t> data_;
  DISALLOW_COPY_AND_ASSIGN(BasicTypeGenerator);
};

// Generator for types represented as StringPieces.
class VariantTypeGenerator : public DataGenerator {
 public:
  VariantTypeGenerator(int seed,
                       std::shared_ptr<std::vector<StringPiece> > pieces)
      : seed_(seed), pieces_(pieces), generator_(seed) {}
  ~VariantTypeGenerator() {}

  std::unique_ptr<DataValidator> CreateValidator() {
    std::unique_ptr<VariantTypeGenerator>
        generator(new VariantTypeGenerator(seed_, pieces_));
    return std::unique_ptr<DataValidator>(
        new VariantTypeValidator(std::move(generator)));
  }

  VariantConstPointer Generate(rowcount_t row_count) {
    data_.resize(row_count);
    for (rowcount_t i = 0; i < row_count; i++) {
      data_[i] = (*pieces_)[generator_() % pieces_->size()];
    }
    return VariantConstPointer(&data_[0]);
  }

 private:
  int seed_;
  std::shared_ptr<std::vector<StringPiece> > pieces_;
  std::default_random_engine generator_;
  std::vector<StringPiece> data_;
  DISALLOW_COPY_AND_ASSIGN(VariantTypeGenerator);
};

// Creates generator for given data type. `pieces` is a vector of StringPieces
// which will be randomly picked for variant types.
std::unique_ptr<DataGenerator>
    CreateGenerator(DataType type,
                    int seed,
                    std::shared_ptr<std::vector<StringPiece> > pieces) {
  switch (type) {
    case INT32:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<INT32>(seed));
    case UINT32:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<UINT32>(seed));
    case INT64:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<INT64>(seed));
    case UINT64:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<UINT64>(seed));
    case FLOAT:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<FLOAT>(seed));
    case DOUBLE:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<DOUBLE>(seed));
    case DATE:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<DATE>(seed));
    case DATETIME:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<DATETIME>(seed));
    case BOOL:
      return std::unique_ptr<DataGenerator>(
          new BasicTypeGenerator<BOOL>(seed));
    case STRING:
      return std::unique_ptr<DataGenerator>(
          new VariantTypeGenerator(seed, pieces));
    case BINARY:
      return std::unique_ptr<DataGenerator>(
          new VariantTypeGenerator(seed, pieces));
    default:
      DCHECK(false);
      return std::unique_ptr<DataGenerator>();
  }
}


// Validates data generated by Generator.
class Validator {
 public:
  Validator(TupleSchema schema,
            const std::vector<std::unique_ptr<DataGenerator> >& generators)
      : schema_(schema) {
    for (const std::unique_ptr<DataGenerator>& generator : generators) {
      validators_.push_back(generator->CreateValidator());
    }
  }

  void Validate(const View& view) {
    rowcount_t row_count = view.row_count();
    bool_const_ptr zeroes = new bool[row_count]();

    for (int i = 0; i < schema_.attribute_count(); i++) {
      const Attribute& attribute = schema_.attribute(i);
      VariantConstPointer data = view.column(i).data();

      if (attribute.is_nullable()) {
        VariantConstPointer is_null(view.column(i).is_null());
        validators_[2 * i + 1]->Validate(row_count, is_null, zeroes);
      }

      bool_const_ptr is_null = attribute.is_nullable() ?
          view.column(i).is_null() : zeroes;
      validators_[2 * i]->Validate(row_count, data, is_null);
    }

    delete[] zeroes;
  }

  // TODO(wzoltak): Quite ugly. Do better?
  void ShrinkTo(const std::vector<int>& columns) {
    TupleSchema new_schema;
    std::set<int> columns_set(columns.begin(), columns.end());

    int removed = 0;
    for (int i = 0; i < schema_.attribute_count(); i++) {
      if (columns_set.find(i) == columns_set.end()) {
        int j = i - removed;
        validators_.erase(validators_.begin() + 2 * j + 1);
        validators_.erase(validators_.begin() + 2 * j);
        removed++;
      } else {
        new_schema.add_attribute(schema_.attribute(i));
      }
    }
    schema_ = new_schema;
  }

  // Skips `row_count` from validation. Allows validation starting from
  // particular row.
  void Skip(rowcount_t row_count) {
    for (auto& data_validator : validators_) {
      data_validator->Skip(row_count);
    }
  }

 private:
  TupleSchema schema_;
  std::vector<std::unique_ptr<DataValidator>> validators_;
  DISALLOW_COPY_AND_ASSIGN(Validator);
};


// Generates data having given schema.
class Generator {
 public:
  // Generator constructor. `seeds` array should contain one number for each
  // schema attribute. StringPieces from `pieces` will be randomly picked
  // for variant types.
  Generator(TupleSchema schema,
            const int seeds[],
            std::shared_ptr<std::vector<StringPiece> > pieces)
      : schema_(schema), view_(new View(schema)) {
    for (int i = 0; i < schema_.attribute_count(); i++) {
      const Attribute attribute = schema_.attribute(i);
      generators_.push_back(
          CreateGenerator(attribute.type(), seeds[i], pieces));
      generators_.push_back(CreateGenerator(BOOL, seeds[i], pieces));
    }
  }

  const View& Generate(rowcount_t row_count) {
    for (int i = 0; i < schema_.attribute_count(); i++) {
      VariantConstPointer data = generators_[2 * i]->Generate(row_count);
      const bool* is_null = nullptr;
      if (schema_.attribute(i).is_nullable()) {
        is_null = generators_[2 * i + 1]->Generate(row_count).as<BOOL>();
      }
      view_->mutable_column(i)->Reset(data, is_null);
    }
    view_->set_row_count(row_count);
    return *view_;
  }

  std::unique_ptr<Validator> CreateValidator() {
    return std::unique_ptr<Validator>(new Validator(schema_, generators_));
  }

 private:
  TupleSchema schema_;
  std::unique_ptr<View> view_;
  std::vector<std::unique_ptr<DataGenerator> > generators_;
  DISALLOW_COPY_AND_ASSIGN(Generator);
};

}  // namespace supersonic

#endif  // SUPERSONIC_CONTRIB_STORAGE_CORE_TEST_DATA_H_
